/*-------------------------------------------------------------------------
 *
 * nodeSeqscan.c
 *      Support routines for sequential scans of relations.
 *
 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *      src/backend/executor/nodeSeqscan.c
 *
 *-------------------------------------------------------------------------
 */
/*
 * INTERFACE ROUTINES
 *        ExecSeqScan                sequentially scans a relation.
 *        ExecSeqNext                retrieve next tuple in sequential order.
 *        ExecInitSeqScan            creates and initializes a seqscan node.
 *        ExecEndSeqScan            releases any storage allocated.
 *        ExecReScanSeqScan        rescans the relation
 *
 *        ExecSeqScanEstimate        estimates DSM space needed for parallel scan
 *        ExecSeqScanInitializeDSM initialize DSM for parallel scan
 *        ExecSeqScanInitializeWorker attach to DSM info in parallel worker
 */
#include "postgres.h"

#include "access/relscan.h"
#include "executor/execdebug.h"
#include "executor/nodeSeqscan.h"
#include "utils/rel.h"
#ifdef _MLS_
#include "utils/mls.h"
#endif

#ifdef __AUDIT_FGA__
#include "audit/audit_fga.h"
#endif


static void InitScanRelation(SeqScanState *node, EState *estate, int eflags);
static TupleTableSlot *SeqNext(SeqScanState *node);

/* ----------------------------------------------------------------
 *                        Scan Support
 * ----------------------------------------------------------------
 */

/* ----------------------------------------------------------------
 *        SeqNext
 *
 *        This is a workhorse for ExecSeqScan
 * ----------------------------------------------------------------
 */
static TupleTableSlot *
SeqNext(SeqScanState *node)
{
    HeapTuple    tuple;
    HeapScanDesc scandesc;
    EState       *estate;
    ScanDirection direction;
    TupleTableSlot *slot;

    /*
     * get information from the estate and scan state
     */
    scandesc = node->ss.ss_currentScanDesc;
    estate = node->ss.ps.state;
    direction = estate->es_direction;
    slot = node->ss.ss_ScanTupleSlot;

    if (scandesc == NULL)
    {
        /*
         * We reach here if the scan is not parallel, or if we're executing a
         * scan that was intended to be parallel serially.
         */
        scandesc = heap_beginscan(node->ss.ss_currentRelation,
                                  estate->es_snapshot,
                                  0, NULL);
        if(enable_distri_print)
        {
            elog(LOG, "seq scan snapshot local %d start ts "INT64_FORMAT " rel %s", estate->es_snapshot->local,
                            estate->es_snapshot->start_ts, RelationGetRelationName(node->ss.ss_currentRelation));
        }
        node->ss.ss_currentScanDesc = scandesc;
    }

    /*
     * get the next tuple from the table
     */
    tuple = heap_getnext(scandesc, direction);

    if(enable_distri_debug)
    {
        if(tuple)
        {
            scandesc->rs_scan_number++;
        }
    }


    /*
     * save the tuple and the buffer returned to us by the access methods in
     * our scan tuple slot and return the slot.  Note: we pass 'false' because
     * tuples returned by heap_getnext() are pointers onto disk pages and were
     * not created with palloc() and so should not be pfree()'d.  Note also
     * that ExecStoreTuple will increment the refcount of the buffer; the
     * refcount will not be dropped until the tuple table slot is cleared.
     */
    if (tuple)
        ExecStoreTuple(tuple,    /* tuple to store */
                       slot,    /* slot to store in */
                       scandesc->rs_cbuf,    /* buffer associated with this
                                             * tuple */
                       false);    /* don't pfree this pointer */
    else
        ExecClearTuple(slot);

    return slot;
}

/*
 * SeqRecheck -- access method routine to recheck a tuple in EvalPlanQual
 */
static bool
SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
{
    /*
     * Note that unlike IndexScan, SeqScan never use keys in heap_beginscan
     * (and this is very bad) - so, here we do not check are keys ok or not.
     */
    return true;
}

/* ----------------------------------------------------------------
 *        ExecSeqScan(node)
 *
 *        Scans the relation sequentially and returns the next qualifying
 *        tuple.
 *        We call the ExecScan() routine and pass it the appropriate
 *        access method functions.
 * ----------------------------------------------------------------
 */
static TupleTableSlot *
ExecSeqScan(PlanState *pstate)
{
    SeqScanState *node = castNode(SeqScanState, pstate);

    return ExecScan(&node->ss,
                    (ExecScanAccessMtd) SeqNext,
                    (ExecScanRecheckMtd) SeqRecheck);
}

/* ----------------------------------------------------------------
 *        InitScanRelation
 *
 *        Set up to access the scan relation.
 * ----------------------------------------------------------------
 */
static void
InitScanRelation(SeqScanState *node, EState *estate, int eflags)
{
    Relation    currentRelation;

    /*
     * get the relation object id from the relid'th entry in the range table,
     * open that relation and acquire appropriate lock on it.
     */
#ifdef __TBASE__
    /* if interval partition, scan child table instead */
    if(((SeqScan *) node->ss.ps.plan)->ispartchild)
    {
        currentRelation = ExecOpenScanRelationPartition(estate,
                                      ((SeqScan *) node->ss.ps.plan)->scanrelid,
                                           eflags,
                                           ((SeqScan *) node->ss.ps.plan)->childidx);
    }
    else
    {
#endif
    currentRelation = ExecOpenScanRelation(estate,
                                           ((SeqScan *) node->ss.ps.plan)->scanrelid,
                                           eflags);
#ifdef __TBASE__
    }
#endif
#ifdef _MLS_
    mls_check_datamask_need_passby((ScanState*)node, currentRelation->rd_id);
#endif 

    node->ss.ss_currentRelation = currentRelation;

    /* and report the scan tuple slot's rowtype */
    ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation));
}


/* ----------------------------------------------------------------
 *        ExecInitSeqScan
 * ----------------------------------------------------------------
 */
SeqScanState *
ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
{
    SeqScanState *scanstate;

#ifdef __AUDIT_FGA__
    ListCell      *item;
#endif

    /*
     * Once upon a time it was possible to have an outerPlan of a SeqScan, but
     * not any more.
     */
    Assert(outerPlan(node) == NULL);
    Assert(innerPlan(node) == NULL);

    /*
     * create state structure
     */
    scanstate = makeNode(SeqScanState);
    scanstate->ss.ps.plan = (Plan *) node;
    scanstate->ss.ps.state = estate;
    scanstate->ss.ps.ExecProcNode = ExecSeqScan;

    /*
     * Miscellaneous initialization
     *
     * create expression context for node
     */
    ExecAssignExprContext(estate, &scanstate->ss.ps);

    /*
     * initialize child expressions
     */
    scanstate->ss.ps.qual =
        ExecInitQual(node->plan.qual, (PlanState *) scanstate);
    
#ifdef __AUDIT_FGA__
    if (enable_fga)
    {
        foreach (item, node->plan.audit_fga_quals)
        {
            AuditFgaPolicy *audit_fga_qual = (AuditFgaPolicy *) lfirst(item);
            
            audit_fga_policy_state * audit_fga_policy_state_item
                    = palloc0(sizeof(audit_fga_policy_state));

            audit_fga_policy_state_item->policy_name = audit_fga_qual->policy_name;
            audit_fga_policy_state_item->query_string = audit_fga_qual->query_string;
            audit_fga_policy_state_item->qual = 
                ExecInitQual(audit_fga_qual->qual, (PlanState *) scanstate);

            scanstate->ss.ps.audit_fga_qual = 
                lappend(scanstate->ss.ps.audit_fga_qual, audit_fga_policy_state_item);      
        }
    }
#endif

    /*
     * tuple table initialization
     */
    ExecInitResultTupleSlot(estate, &scanstate->ss.ps);
    ExecInitScanTupleSlot(estate, &scanstate->ss);

    /*
     * initialize scan relation
     */
    InitScanRelation(scanstate, estate, eflags);

    /*
     * Initialize result tuple type and projection info.
     */
    ExecAssignResultTypeFromTL(&scanstate->ss.ps);
    ExecAssignScanProjectionInfo(&scanstate->ss);

    return scanstate;
}

/* ----------------------------------------------------------------
 *        ExecEndSeqScan
 *
 *        frees any storage allocated through C routines.
 * ----------------------------------------------------------------
 */
void
ExecEndSeqScan(SeqScanState *node)
{
    Relation    relation;
    HeapScanDesc scanDesc;

    /*
     * get information from node
     */
    relation = node->ss.ss_currentRelation;
    scanDesc = node->ss.ss_currentScanDesc;

    /*
     * Free the exprcontext
     */
    ExecFreeExprContext(&node->ss.ps);

    /*
     * clean out the tuple table
     */
    ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
    ExecClearTuple(node->ss.ss_ScanTupleSlot);

    /*
     * close heap scan
     */
    if (scanDesc != NULL)
        heap_endscan(scanDesc);

    /*
     * close the heap relation.
     */
    ExecCloseScanRelation(relation);
}

/* ----------------------------------------------------------------
 *                        Join Support
 * ----------------------------------------------------------------
 */

/* ----------------------------------------------------------------
 *        ExecReScanSeqScan
 *
 *        Rescans the relation.
 * ----------------------------------------------------------------
 */
void
ExecReScanSeqScan(SeqScanState *node)
{
    HeapScanDesc scan;

    scan = node->ss.ss_currentScanDesc;

    if (scan != NULL)
        heap_rescan(scan,        /* scan desc */
                    NULL);        /* new scan keys */

    ExecScanReScan((ScanState *) node);
}

/* ----------------------------------------------------------------
 *                        Parallel Scan Support
 * ----------------------------------------------------------------
 */

/* ----------------------------------------------------------------
 *        ExecSeqScanEstimate
 *
 *        estimates the space required to serialize seqscan node.
 * ----------------------------------------------------------------
 */
void
ExecSeqScanEstimate(SeqScanState *node,
                    ParallelContext *pcxt)
{
    EState       *estate = node->ss.ps.state;

    node->pscan_len = heap_parallelscan_estimate(estate->es_snapshot);
    shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
    shm_toc_estimate_keys(&pcxt->estimator, 1);
}

/* ----------------------------------------------------------------
 *        ExecSeqScanInitializeDSM
 *
 *        Set up a parallel heap scan descriptor.
 * ----------------------------------------------------------------
 */
void
ExecSeqScanInitializeDSM(SeqScanState *node,
                         ParallelContext *pcxt)
{
    EState       *estate = node->ss.ps.state;
    ParallelHeapScanDesc pscan;

    pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
    heap_parallelscan_initialize(pscan,
                                 node->ss.ss_currentRelation,
                                 estate->es_snapshot);
    shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
    node->ss.ss_currentScanDesc =
        heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
}

/* ----------------------------------------------------------------
 *        ExecSeqScanInitializeWorker
 *
 *        Copy relevant information from TOC into planstate.
 * ----------------------------------------------------------------
 */
void
ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc)
{
    ParallelHeapScanDesc pscan;

    pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false);
    node->ss.ss_currentScanDesc =
        heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
}
